package com.ks.mqttServer;

import com.ks.domain.MqttRequest;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

/**
 * @program: SpringBootDemos
 * @description:
 * @author: Kangsen
 * @create: 2022-06-23 14:33
 **/
@Slf4j
public class MqttServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 保存客户端连接
     */
    private static final Collection<Channel> clientList = new HashSet<>();

    private static final Map<String, Object> msgMap = new HashMap<>();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //只处理MQTT的消息数据
        if (msg instanceof MqttMessage) {
            MqttMessage mqttMessage = (MqttMessage) msg;
            MqttMessageType mqttMessageType = mqttMessage.fixedHeader().messageType();
            Channel channel = ctx.channel();
            switch (mqttMessageType) {
                case CONNECT:
                    log.info("客户端：{}  连接消息: {}", ctx.channel().remoteAddress(), mqttMessage);
                    conn(ctx, msg);
                    break;
                case PUBLISH:
                    log.info("客户端：{}  发布消息: {}", ctx.channel().remoteAddress(), mqttMessage);
                    publish(ctx, (MqttPublishMessage) msg);
                    break;
                case SUBSCRIBE:
                    log.info("客户端：{}  订阅消息: {}", ctx.channel().remoteAddress(), mqttMessage);
                    subscribe(channel, (MqttSubscribeMessage) mqttMessage);
                    break;
                case UNSUBSCRIBE:
                    log.info("客户端：{}  退订消息: {}", ctx.channel().remoteAddress(), mqttMessage);
                    unSubscribe(channel, (MqttUnsubscribeMessage) mqttMessage);
                    break;
                case PINGREQ:
                    log.info("客户端：{}  心跳包: {}", ctx.channel().remoteAddress(), mqttMessage);
                    pingReq(channel,mqttMessage);
                    break;
                case DISCONNECT:
                    log.info("客户端：{}  断开连接: {}", ctx.channel().remoteAddress(), mqttMessage);
                    disConnect(channel,mqttMessage);
                    break;
                case PUBACK:
                    log.info("客户端：{}  确认收到消息: {}", ctx.channel().remoteAddress(), mqttMessage);
                    pubAck(ctx, (MqttPublishMessage) mqttMessage);
                    break;
                case PUBREL:
                    log.info("客户端：{}  客户端REL收到消息: {}", ctx.channel().remoteAddress(), mqttMessage);
                    pubRel(channel,mqttMessage);
                    break;
                case PUBCOMP:
                    log.info("客户端：{}  客户端COMP收到消息: {}", ctx.channel().remoteAddress(), mqttMessage);
                    break;
                case PUBREC:
                    log.info("客户端：{}  客户端REC收到消息: {}", ctx.channel().remoteAddress(), mqttMessage);
                    break;
                default:
                    log.error("客户端：{}  未知消息 : {}", ctx.channel().remoteAddress(), mqttMessage);
                    break;
            }
        } else {
            log.error("非MQTT消息数据,丢弃");
        }
    }

    private void pubRel(Channel channel,MqttMessage mqttMessage) {
        //mqttMessage.variableHeader().
    }

    /**
     * 服务端接收到心跳包
     * @param channel
     * @param mqttMessage
     */
    private void pingReq(Channel channel,MqttMessage mqttMessage) {
        log.debug("MQTT PINGREQ RECEIVED");
        MqttMessage pingresp = new MqttMessage(
            new MqttFixedHeader(
                    MqttMessageType.PINGRESP,false,MqttQoS.AT_LEAST_ONCE,false,0
            ));
        channel.writeAndFlush(pingresp);
    }

    /**
     * 服务端收到退订的消息
     * @param channel
     * @param mqttUnsubscribeMessage
     */
    private void unSubscribe(Channel channel,MqttUnsubscribeMessage mqttUnsubscribeMessage) {
        MqttUnsubAckMessage mqttUnsubAckMessage = (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0),
                MqttMessageIdVariableHeader.from(mqttUnsubscribeMessage.variableHeader().messageId()), null
        );
        channel.writeAndFlush(mqttUnsubAckMessage);
        disConnect(channel,mqttUnsubscribeMessage);
    }

    /**
     * 服务端收到退订消息
     * @param channel
     * @param mqttMessage
     */
    private void disConnect(Channel channel,MqttMessage mqttMessage) {
        clientList.remove(channel);
        if (channel.isActive()) {
            channel.close();
            log.info("MQTT channel:{} was closed ",channel.id().asShortText());
        }
    }


    /**
     * 设备下线
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        String ipAddress = String.valueOf(ctx.channel().remoteAddress()).replace("/", "");
        log.info("Client ： {} 断开连接！", ctx.channel().remoteAddress());
    }

    /**
     * 处理连接请求
     *
     * @param ctx
     * @param msg
     */
    public void conn(ChannelHandlerContext ctx, Object msg) {
        log.info("处理连接请求。。。");
        MqttConnAckMessage mqttConnAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_LEAST_ONCE, false, 0),
                new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, true),
                null
        );
        ctx.channel().writeAndFlush(mqttConnAckMessage);
        clientList.add(ctx.channel());
    }

    /**
     * 发布消息
     *
     * @param ctx
     * @param msg
     */
    public void publish(ChannelHandlerContext ctx, MqttPublishMessage msg) {
        String topic = msg.variableHeader().topicName();
        MqttQoS mqttQoS = msg.fixedHeader().qosLevel();
        log.info("客户端 {} 发布消息  类型：{}  主题：{}",ctx.channel().remoteAddress(), mqttQoS,topic);
        //将消息拷贝到字节缓存区
        ByteBuf duplicateByteBuf = msg.content().duplicate();
        //创建字节数组 初始化长度
        byte[] bytes = new byte[duplicateByteBuf.readByte()];
        //将数据读取到字节数组中
        duplicateByteBuf.readBytes(bytes);
        String content = new String(bytes, CharsetUtil.UTF_8);
        if (StringUtils.isBlank(content)) {
            log.error("接收到的数据是空的！");
            pubAck(ctx, msg, "接收到的数据是空的！");
            return;
        }
        log.info("接收到发布消息：{}", content);
        // 如果是qos1或者qos2类型都需要响应
        pubAck(ctx, msg, content);
        // 推送主题消息
        log.info("推送客户端 客户端消息:{}", content);
        if (msg.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE || msg.fixedHeader().qosLevel() == MqttQoS.AT_MOST_ONCE) {
            //给所有的客户端推送消息
            for (Channel channel : clientList) {
                try {
                    send(channel,topic,mqttQoS,content);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    log.info("给客户端推送 订阅消息异常 ",e);
                }
            }
        }
    }

    /**
     * 发布消息的ACK
     *
     * @param ctx
     * @param msg
     */
    public void pubAck(ChannelHandlerContext ctx, MqttPublishMessage msg) {
        log.info("两个参数的  pubAck ");
    }

    /**
     * 发布消息的ACK
     *
     * @param ctx
     * @param msg
     * @param content
     */
    public void pubAck(ChannelHandlerContext ctx, MqttPublishMessage msg, String content) {
        if (msg.fixedHeader().qosLevel() == MqttQoS.AT_MOST_ONCE) {
            return;
        }
        //需要服务端响应
        if (msg.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
            MqttPubAckMessage mqttPubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.PUBACK, false, msg.fixedHeader().qosLevel(), false, 0),
                    //盲猜这里需要返回接受到消息的包编号
                    MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()),
                    content
            );
            ctx.channel().writeAndFlush(mqttPubAckMessage);
            return;
        }

        if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
            //服务端第一次回复pubrec  然后接收到 pubrel 后再回 pubcomp
            MqttPubAckMessage mqttPubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.PUBREC, false, msg.fixedHeader().qosLevel(), false, 0),
                    //盲猜这里需要返回接受到消息的包编号
                    MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()),
                    content
            );
            ctx.channel().writeAndFlush(mqttPubAckMessage);
            return;
        }
    }

    /**
     * 当mqtt服务端收到订阅消息时回复
     * @param channel
     * @param subscribeMessage
     */
    public void subscribe(Channel channel,MqttSubscribeMessage subscribeMessage){
        MqttQoS mqttQoS = subscribeMessage.fixedHeader().qosLevel();
        MqttSubAckMessage mqttSubAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.SUBACK,false,mqttQoS,false,0),
                MqttMessageIdVariableHeader.from(subscribeMessage.variableHeader().messageId()),
                new MqttSubAckPayload(0));
        channel.writeAndFlush(mqttSubAckMessage);
    }

    /**
     * 向客户端发布订阅消息
     * @param channel
     * @param topic
     * @param qoS
     * @param sendMessage
     * @return
     * @throws InterruptedException
     */
    public ChannelFuture send(Channel channel, String topic, MqttQoS qoS, String sendMessage) throws InterruptedException {
        MqttRequest mqttRequest = new MqttRequest();
        mqttRequest.setPayload(sendMessage.getBytes());
        MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.PUBLISH,
                        mqttRequest.isDup(),
                        qoS,
                        mqttRequest.isRetained(),
                        0
                ),
                new MqttPublishVariableHeader(topic, 0),
                Unpooled.buffer().writeBytes(mqttRequest.getPayload()));
        //这里不知道为啥记录这个id  key value 一致
        msgMap.put(publishMessage.variableHeader().packetId() + "", publishMessage.variableHeader().packetId() + "");
        if (channel.isWritable()) {
            return channel.writeAndFlush(publishMessage);
        }
        return channel.writeAndFlush(publishMessage).sync();
    }
}
